/**
 *Copyright 2013 by dragon.
 *
 *File name: BitTorrentDownloader.java
 *Author:      dragon
 *Email:       fufulove2012@gmail.com
 *Blog:        http://blog.csdn.net/xidomlove
 *Version:     1.0.0
 *Date:        2013-10-7 下午1:30:16
 *Description: BitTorrent协议实现，参考文档：http://blog.csdn.net/wangpingfang/article/category/546550 https://wiki.theory.org/BitTorrent_Tracker_Protocol
 *https://wiki.theory.org/BitTorrentSpecification#bitfield:_.3Clen.3D0001.2BX.3E.3Cid.3D5.3E.3Cbitfield.3E
 */
package com.dragon.jaxel.bittorrent;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.TimerTask;
import java.util.Map.Entry;
import java.util.UUID;

import com.dragon.jaxel.Downloader;
import com.dragon.jaxel.bittorrent.bencode.Bencode;
import com.dragon.jaxel.bittorrent.bencode.BencodeDictionary;
import com.dragon.jaxel.bittorrent.bencode.BencodeList;
import com.dragon.jaxel.bittorrent.bencode.BencodeString;
import com.dragon.jaxel.bittorrent.bencode.BencodeValue;
import com.dragon.log.Logger;

/**
 * @author dragon8
 * 
 */
public class BitTorrentDownloader implements Downloader {

	/**
	 * 种子文件元信息
	 */
	private BencodeDictionary metaData = null;

	/**
	 * 待下载的文件在种子文件中文件列表的的索引，如果存在多文件的话,如果不是多个文件则为-1
	 */
	private int fileIndex;

	private long fileOffset;

	private PieceManager pieceManager;
	/**
	 * 保存的文件名
	 */
	private String saveFileName = null;

	/**
	 * 文件长度
	 */
	private long size = -1;

	private long progress = 0;
	/**
	 * bit torrent下载客户端名称
	 */
	final static String PEER_ID_PREFIX_STRING = "-SD0010-";

	/**
	 * 下载线程
	 */
	private Downloader downloader;

	public BitTorrentDownloader() {
		super();
		// TODO Auto-generated constructor stub
	}

	/**
	 * @param bencoding
	 *            种子文件元信息
	 * @param fileIndex
	 *            待下载的文件在种子文件中文件列表的的索引，如果存在多文件的话,否则传-1
	 * @param saveFileName
	 *            指定保存文件名
	 * @throws IOException
	 */
	public BitTorrentDownloader(BencodeDictionary metaData, int fileIndex,
			long fileOffset, String saveFileName) throws IOException {
		// TODO Auto-generated constructor stub
		this.metaData = metaData;
		this.fileIndex = fileIndex;
		this.saveFileName = saveFileName;
		this.fileOffset = fileOffset;
		pieceManager = new PieceManager();
		downloader = new Downloader();
		pieceManager.init(this.fileOffset, getSize(), downloader.pieceLength);
	}

	@Override
	public String getProtocol() {
		// TODO Auto-generated method stub
		return "bit torrent";
	}

	@Override
	public boolean isDownLoading() {
		// TODO Auto-generated method stub
		return true;
	}

	@Override
	public boolean isCompleted() {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public String getSavePathString() {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public long getSize() {
		// TODO Auto-generated method stub
		if (size != -1) {
			return size;
		}
		BencodeDictionary dictionary = metaData.get("info").getAsDictionary();
		BencodeList files = (BencodeList) dictionary.get("files");
		if (files == null) {
			size = dictionary.get("length").getAsBencodeNumber().getData();
		} else {
			size = files.get(fileIndex).getAsDictionary().get("length")
					.getAsBencodeNumber().getData();
		}
		return size;
	}

	@Override
	public long getProgress() {
		// TODO Auto-generated method stub
		return progress;
	}

	@Override
	public void deleteSateFile() {
		// TODO Auto-generated method stub

	}

	@Override
	public void saveState() throws IOException {
		// TODO Auto-generated method stub

	}

	@Override
	public boolean start() throws IOException {
		// TODO Auto-generated method stub
		downloader.start();
		return true;

	}

	@Override
	public boolean start(int connectionCount) throws IOException {
		// TODO Auto-generated method stub
		return start();
	}

	@Override
	public void stop() {
		// TODO Auto-generated method stub

	}

	@Override
	public void pause() {
		// TODO Auto-generated method stub

	}

	@Override
	public boolean resume() throws IOException {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public void loadState(ObjectInputStream inputStream)
			throws FileNotFoundException, IOException, ClassNotFoundException,
			InstantiationException, IllegalAccessException,
			IllegalArgumentException, InvocationTargetException,
			NoSuchMethodException, SecurityException {
		// TODO Auto-generated method stub

	}

	private static String urlEncode(byte[] bs) {
		StringBuffer sb = new StringBuffer(bs.length * 3);
		for (int i = 0; i < bs.length; i++) {
			int c = bs[i] & 0xFF;
			sb.append('%');
			if (c < 16) {
				sb.append('0');
			}
			sb.append(Integer.toHexString(c));
		}
		return sb.toString();
	}

	/**
	 * 连接到一个peer
	 * 
	 * @param peer
	 * @param selector
	 */
	static boolean ConnectPeer(Peer peer, Selector selector) {
		try {
			peer.connect();
			peer.registerEvent(selector);
			return true;
		} catch (IOException e) {
			// TODO Auto-generated catch block
			// 连接失败，减少优先级
			peer.decreasePriority();
			Logger.getDefaultLogger()
					.debug(String
							.format("Bit torrent network error: Connect to %s:%d failed.",
									peer.address.getHostName(),
									peer.address.getPort()));
		}
		return false;
	}

	class Downloader implements Runnable {

		public Downloader() {
			super();
			// TODO Auto-generated constructor stub
			pieceLength = (int) metaData.get("info").getAsDictionary()
					.get("piece length").getAsBencodeNumber().getData();
		}

		/**
		 * 握手消息的字节数组，长度20
		 */
		private byte[] handShakeMessage;
		/**
		 * 种子文件中info字段的sha-1 hash
		 */
		private byte[] infoHash;
		/**
		 * 本机的peer id字段的字节数组，长度20
		 */
		private byte[] peerId;
		/**
		 * 当前正在运行的线程对象
		 */
		Thread thread;

		/**
		 * 一次请求的文件块大小
		 */
		final static int PEER_REQUESET_SIZE = 16 * 1024;

		FileWriter fileWriter;

		/**
		 * 指明客户端在下一次连接Tracker前所需等待的时间，以秒为单位
		 */
		int interval;

		/**
		 * 种子文件中每个片的长度
		 */
		private final int pieceLength;

		/**
		 * 所有peer的集合
		 */
		private ArrayList<Peer> peerList = new ArrayList<Peer>();

		private ServerSocketChannel serverSocketChannel = null;

		boolean bRunning = false;

		/**
		 * 开始下载
		 */
		void start() {
			bRunning = true;
			thread = new Thread(this);
			thread.start();
		}

		/**
		 * 停止下载
		 * 
		 * @throws InterruptedException
		 */
		void stop() throws InterruptedException {
			bRunning = false;
			thread.join();
			thread = null;
		}

		/**
		 * 查询tracker
		 * 
		 * @return
		 * @throws IOException
		 */
		BencodeDictionary queryTracker() throws IOException {
			String urlString = metaData.get("announce").getAsBencodeString()
					.getData();
			Logger.getDefaultLogger().debug(
					"query tracker, info hash:" + bytArrayToHex(infoHash));
			// tracker服务器采用的是http协议
			if (urlString.startsWith("http")) {
				// 保存url参数的map
				HashMap<String, String> params = new HashMap<String, String>();
				// 构造url参数
				params.put("info_hash", urlEncode(infoHash));
				String pId = String.format("%s%s", PEER_ID_PREFIX_STRING,
						UUID.randomUUID().toString().split("-")[4]).substring(
						0, 20);
				peerId = pId.getBytes();
				params.put("peer_id", URLEncoder.encode(pId, "utf-8"));
				params.put("port", String
						.valueOf(((InetSocketAddress) serverSocketChannel
								.getLocalAddress()).getPort()));
				params.put("uploaded", String.valueOf(0));
				params.put("downloaded", String.valueOf(0));
				params.put("left", String.valueOf(getSize()));
				params.put("compact", String.valueOf(1));
				params.put("event", "started");
				params.put("no_peer_id", "nothing");
				urlString += "?";
				Iterator<Entry<String, String>> iter = params.entrySet()
						.iterator();
				while (iter.hasNext()) {
					Entry<String, String> entry = iter.next();
					urlString += entry.getKey() + "=" + entry.getValue() + "&";
				}
				// http请求track server
				URL url = new URL(
						urlString.substring(0, urlString.length() - 1));
				HttpURLConnection connection = (HttpURLConnection) url
						.openConnection();
				connection.connect();
				if (connection.getResponseCode() / 100 == 2) {
					BufferedInputStream inputStream = new BufferedInputStream(
							connection.getInputStream());
					// 对服务器返回进行解码
					BencodeDictionary dic = Bencode.decodeStream(inputStream)
							.getAsDictionary();
					inputStream.close();
					BencodeValue failReasonString = dic.get("failure reason");
					if (failReasonString != null) {
						// Tracker服务器返回错误
						Logger.getDefaultLogger().error(
								"Tracker esponse fail error: "
										+ failReasonString.getAsBencodeString()
												.getData());
					} else {
						// 请求成功
						return dic;
					}
				} else {
				}

			} else {

			}
			return null;
		}

		/**
		 * 初始化下载所需的信息
		 */
		boolean initDownload() {
			try {
				// 编码info hash
				byte[] bytes = Bencode.encode(metaData.get("info"));
				// sha1 加密 info hash
				MessageDigest messageDigest = MessageDigest
						.getInstance("SHA-1");
				messageDigest.update(bytes);
				infoHash = messageDigest.digest();
			} catch (NoSuchAlgorithmException e) {
				// TODO Auto-generated catch block
				Logger.getDefaultLogger().info(
						"SHA-1 encode failed. " + e.getMessage());
				return false;
			} catch (IOException e) {
				// TODO Auto-generated catch block
				Logger.getDefaultLogger().debug(e.getMessage());
				return false;
			}

			try {
				if (!listen()) {
					// 打开监听套接字出错，退出
					return false;
				}
				BencodeDictionary dictionary = queryTracker();
				// 指明客户端在下一次连接Tracker前所需等待的时间，以秒为单位
				interval = (int) dictionary.get("interval")
						.getAsBencodeNumber().getData();
				// peer列表
				BencodeString peers = (BencodeString) dictionary.get("peers");
				// IPV6的peer列表，如果存在的话
				// BencodeList peers_ipv6 = (BencodeList)
				// dictionary.get("peers_ipv6");

				// 客户端握手协议消息生成
				ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
				byteArrayOutputStream.write(19);
				byteArrayOutputStream.write(new String("BitTorrent protocol")
						.getBytes());
				for (int i = 0; i < 8; i++) {
					byteArrayOutputStream.write(0);
				}
				byteArrayOutputStream.write(infoHash);
				byteArrayOutputStream.write(peerId);
				handShakeMessage = byteArrayOutputStream.toByteArray();

				// 构造peer列表
				for (int i = 0; i < peers.getBytes().length; i += 6) {
					byte[] ip = Arrays.copyOfRange(peers.getBytes(), i, i + 4);
					int port = (peers.getBytes()[i + 4] & 0xff) << 8
							| (0xff & peers.getBytes()[i + 5]);
					peerList.add(new Peer(new InetSocketAddress(InetAddress
							.getByAddress(ip), port)));
				}
				return true;
			} catch (IOException e) {
				// TODO Auto-generated catch block
				Logger.getDefaultLogger().debug(e.getMessage());
				return false;
			}
		}

		/**
		 * 打开监听套接字
		 * 
		 * @return
		 */
		private boolean listen() {
			// TODO Auto-generated method stub
			try {
				serverSocketChannel = ServerSocketChannel.open();
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				return false;
			}
			for (int port = 6881; port < 6889; port++) {
				try {
					serverSocketChannel.bind(new InetSocketAddress(port));
					break;
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
			if (serverSocketChannel == null) {
				try {
					serverSocketChannel.bind(new InetSocketAddress(0));
				} catch (IOException e) {
					// TODO Auto-generated catch block
					Logger.getDefaultLogger().debug(e.getMessage());
					return false;
				}
			}
			return true;
		}

		@Override
		public void run() {
			// TODO Auto-generated method stub
			if (!initDownload()) {
				// 初始化错误，退出
				return;
			}

			// 打开文件
			File file = new File(saveFileName);
			fileWriter = new FileWriter();
			fileWriter.open(file);
			fileWriter.truncate(getSize());

			// 注册事件并连接peers
			Selector selector = null;
			try {
				selector = Selector.open();
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				Logger.getDefaultLogger().info(
						"Open selector Filed" + e1.getMessage());
			}

			// 初始化所有连接
			// for (Peer peer : peerList) {
			// ConnectPeer(peer, selector);
			// }
			ConnectPeer(peerList.get(0), selector);
			// 轮询事件
			try {
				serverSocketChannel.configureBlocking(false);
			} catch (IOException e3) {
				// TODO Auto-generated catch block
				e3.printStackTrace();
			}
			try {
				serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
			} catch (ClosedChannelException e2) {
				// TODO Auto-generated catch block
				e2.printStackTrace();
			}
			long lastTime = System.currentTimeMillis();
			while (bRunning) {

				// 发送keep alive消息
				long now = System.currentTimeMillis();
				if (now - lastTime > 60000) {
					Logger.getDefaultLogger().debug(
							"Send KeepAlive message to peers");
					// 1分钟已经过去了
					lastTime = now;
					for (Peer peer : peerList) {
						if (peer.isBitfieldRecieved && peer.isConnected()) {
							peer.sendMessage(PeerMessage.wrapKeepAlive());
						}
					}
				}

				// 检查事件数量
				try {
					if (selector.select() <= 0) {
						continue;
					}
				} catch (IOException e) {
					// TODO Auto-generated catch block
					Logger.getDefaultLogger().debug(e.getMessage());
					break;
				}

				Iterator<SelectionKey> iterator = selector.selectedKeys()
						.iterator();

				while (iterator.hasNext()) {
					SelectionKey selectionKey = iterator.next();
					iterator.remove();
					try {
						process(selectionKey);
					} catch (IOException e) {
						Logger.getDefaultLogger().debug(e.getMessage());
						closePeerConnection(selectionKey);
					}
				}
			}
			// 释放资源
			try {
				selector.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				Logger.getDefaultLogger().debug(
						"Close selector error: " + e.getMessage());
			}
			fileWriter.close();
			Logger.getDefaultLogger().debug("Download thread exit.");
			bRunning = false;
		}

		void closePeerConnection(SelectionKey selectionKey) {
			Peer peer = (Peer) selectionKey.attachment();
			InetSocketAddress address = peer.address;
			Logger.getDefaultLogger().info(
					String.format("Connction closed. host=%s port=%d",
							address.getHostName(), address.getPort()));
			peer.close();
			peerList.remove(peer);
			selectionKey.cancel();
		}

		// 处理socket事件
		private void process(SelectionKey selectionKey) throws IOException {
			// TODO Auto-generated method stub

			Peer peer = (Peer) selectionKey.attachment();

			if (selectionKey.isConnectable()) {

				SocketChannel socketChannel = (SocketChannel) selectionKey
						.channel();
				// 连接事件
				boolean success = socketChannel.finishConnect();
				if (!success) {
					closePeerConnection(selectionKey);
				} else {

					InetSocketAddress address = (InetSocketAddress) socketChannel
							.getRemoteAddress();
					Logger.getDefaultLogger().info(
							String.format(
									"Connction established. host=%s port=%d",
									address.getHostName(), address.getPort()));
					if (!peer.isPassive) {
						peer.sendMessage(ByteBuffer.wrap(handShakeMessage));
					}
				}
			} else if (selectionKey.isReadable()) {

				SocketChannel socketChannel = (SocketChannel) selectionKey
						.channel();
				// socket可读
				ByteBuffer buffer = peer.peerMessageBuffer;

				if (!peer.isHandshakeRecieved) {
					// 读取握手信息
					buffer.limit(68);
					int len = socketChannel.read(buffer);
					if (len < 0) {
						closePeerConnection(selectionKey);
					} else if (buffer.position() == 68) {
						byte[] bytes = new byte[20];
						buffer.position(28);

						// info hash 的十六进制字符串
						buffer.get(bytes);
						String infoHash = bytArrayToHex(bytes);

						// peer id 的字符串表示 形式
						buffer.get(bytes);
						String peerId = new String(bytes);
						Logger.getDefaultLogger()
								.debug(String
										.format("Recieve peer's handshake message: info hash=[%s] peer id=[%s]",
												infoHash, peerId));
						peer.isHandshakeRecieved = true;
						if (peer.isPassive) {
							peer.sendMessage(ByteBuffer.wrap(handShakeMessage));
						}
						peer.sendMessage(PeerMessage
								.wrapBitFieldMessage(pieceManager.bitSet));
						buffer.position(0);
						buffer.limit(4);
					}
				} else {
					// 读取非握手信息
					int len = socketChannel.read(buffer);
					if (len < 0) {
						peer.close();
						selectionKey.cancel();
						peerList.remove(peer);
					} else if (buffer.position() == 4) {
						// 设置位置为0读取消息长度
						int pos = buffer.position();
						buffer.position(0);
						len = buffer.getInt();
						buffer.position(pos);
						if (len > buffer.capacity() || len < 0) {
							Logger.getDefaultLogger()
									.debug("Invalid messsage Received,close connection.");
							closePeerConnection(selectionKey);
						} else if (len == 0) {
							// keep alive消息
							buffer.position(0);
							buffer.limit(4);
						} else {
							buffer.limit(len + 4);
							socketChannel.read(buffer);
							if (buffer.position() == buffer.limit()
									|| (!peer.isBitfieldRecieved && buffer
											.position() > 4)) {
								// 一个消息读取完成
								buffer.position(4);
								byte messageId = buffer.get();
								Logger.getDefaultLogger()
										.debug(String
												.format("Receive message: id=%d, len=%d",
														(int) messageId, len));
								switch (messageId) {
								case PeerMessage.MESSAGE_BIT_FIELD:
									// 接收到对方piece位图
									int blen = len / 8;
									if (len % 8 != 0) {
										blen += 1;
									}
									byte[] bytes = new byte[blen];
									buffer.get(bytes);
									peer.bitSet = new BitSet(bytes, len);
									peer.sendMessage(PeerMessage
											.wrapInterested());
									peer.isBitfieldRecieved = true;
									peer.amInterested = true;
									break;
								case PeerMessage.MESSAGE_CANCEL:
									// 取消对某个slice的数据请求
									break;
								case PeerMessage.MESSAGE_CHOKE:
									// 如果某个peer原先是解除阻塞的，而此次被阻塞，则发送choke消息。
									peer.peerChoking = true;
									break;

								case PeerMessage.MESSAGE_HAVE:
									// 指明下标为index的piece，peer已经拥有
									int idx = buffer.getInt();
									peer.bitSet.setBit(idx);
									break;

								case PeerMessage.MESSAGE_INTERESTED:
									// 当客户端收到某peer的have消息时，如果发现peer拥有了客户端没有的piece，则发送interested消息告知该peer，客户端对它感兴趣。
									peer.amChoking = false;
									peer.peerInterested = true;
									peer.sendMessage(PeerMessage.wrapUnChoke());
									break;
								case PeerMessage.MESSAGE_NOT_INTERESTED:
									// 当客户端下载了某个piece，如果发现客户端拥有了这个piece后，某个peer拥有的所有piece，客户端都拥有，则发送not
									// interested消息给该peer
									peer.peerInterested = false;
									break;
								case PeerMessage.MESSAGE_PIECE:
									// 当客户端收到某个peer的request消息后，如果判定当前未将该peer阻塞，且peer请求的slice，客户端已经下载，则发送piece消息将文件数据上传给该peer
									// 一个piece读取完成，写文件
									int index = buffer.getInt();
									int begin = buffer.getInt();
									progress += len;
									peer.isRequestingPiece = false;
									fileWriter
											.write(pieceManager.pieces[index].littlePieces[0].fileOffset
													+ begin, buffer);
									break;

								case PeerMessage.MESSAGE_PORT:
									// 该消息只在支持DHT的客户端中才会使用，用于指明DHT监听的端口号，一般不必理会，收到该消息时，直接丢弃即可

									break;
								case PeerMessage.MESSAGE_REQUEST:
									// 当客户端收到某个peer发来的unchoke消息后，即构造request消息，向该peer发送数据请求。前面提到，peer之间交换数据是以slice（长度为16KB的块）为单位的，因此request消息中length的值一般为16K。对于一个256KB的piece，客户端分16次下载，每次下载一个16K的slice。
									break;
								case PeerMessage.MESSAGE_UNCHOKE:
									// 客户端每隔一定的时间，通常为10秒，计算一次各个peer的下载速度，如果某peer被解除阻塞，则发送unchoke消息
									peer.peerChoking = false;
									break;
								}
								buffer.position(0);
								buffer.limit(4);
							}
						}
					}
				}
			} else if (selectionKey.isAcceptable()) {
				ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey
						.channel();
				SocketChannel sChannel = serverSocketChannel.accept();
				sChannel.configureBlocking(false);
				InetSocketAddress address = (InetSocketAddress) sChannel
						.getRemoteAddress();
				Logger.getDefaultLogger().info(
						String.format("Accept connction from host=%s port=%d",
								address.getHostString(), address.getPort()));
				peer = new Peer(address);
				peer.socketChannel = sChannel;
				peer.registerEvent(selectionKey.selector());
				peer.isPassive = true;
				peerList.add(peer);
			} else if (selectionKey.isWritable()) {
				System.out.println("writeable");
				peer.sendPendingMessage();
				if (!peer.isRequestingPiece && !peer.peerChoking) {
					// 请求piece
					boolean req = false;
					for (int i = 0; i < pieceManager.pieces.length; i++) {
						if (req) {
							break;
						}
						if (peer.bitSet.isBitSet(i)) {
							for (int j = 0; j < pieceManager.pieces[i].littlePieces.length; j++) {
								if (req) {
									break;
								}
								LittlePiece littlePiece = pieceManager.pieces[i].littlePieces[j];
								if (!littlePiece.isRequesting) {
									littlePiece.isRequesting = true;
									peer.isRequestingPiece = true;
									peer.sendMessage(PeerMessage.wrapRequest(
											littlePiece.pieceIndex,
											littlePiece.pieceOffset,
											littlePiece.length));
									req = true;
								}
							}
						}
					}
				}
			}
		}
	}

	String bytArrayToHex(byte[] a) {
		StringBuilder sb = new StringBuilder();
		for (byte b : a)
			sb.append(String.format("%02x", b & 0xff));
		return sb.toString();
	}

	@Override
	public boolean isSupportRange() {
		// TODO Auto-generated method stub
		return true;
	}
}
